#!/usr/local/python36/bin/python3.6
# -*- coding:utf-8 -*-
#

from django.shortcuts import render
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponse
from pykafka import KafkaClient
from pykafka.balancedconsumer import BalancedConsumer
from pykafka.simpleconsumer import OwnedPartition, OffsetType
import struct, io, socket, sys, threading, time, json, os, sched
from .tools.ipquery import Ip2Region
from .tools.addmysql import add_province, add_city, sel_province, sel_city, top_province, after_province
from .tools.addmongo import insert_mongo_all
from datetime import datetime
from pymongo import MongoClient


city_list = ["北京", "天津", "上海", "重庆", "河北", "河南", "云南", "辽宁", "黑龙江", "湖南", "安徽", "山东", "新疆", "江苏", "浙江", "江西", "湖北",
             "广西", "甘肃", "山西", "内蒙古", "陕西", "吉林", "福建", "贵州", "广东", "青海", "西藏", "四川", "宁夏", "海南", "台湾", "香港", "澳门"]
city_name = "四川"
other_city_dist1 = {}
other_city_dist2 = {}
city_dist = sel_province(city_list)
kafka_list = []

#连接kafka
client = KafkaClient(hosts="10.28.148.136:9092,10.28.148.97:9092")
topic = client.topics[b'requestStatistics']
balanced_consumer = topic.get_simple_consumer(
    consumer_group=b'china',
    # 从头开始
    # reset_offset_on_start=True,
    auto_commit_enable=True,
)
#连接mongodb
try:
    # 认证（密码无特殊符号）
    mongo_client = MongoClient('mongodb://admin:p-0p-0p-0@10.46.70.164:27017/kafkadb?')
    # 认证（密码含特殊符号）
    # mongo_user = 'admin'
    # mongo_pwd = 'p-0p-0p-0'
    # db_name = 'kafkadb'
    # coll_name = 'requestStatistics'
    # mongo_client = MongoClient("120.27.199.75")
    # mongo_client[db_name].authenticate(mongo_user,mongo_pwd,db_name)

    # 连接到库：kafkadb
    newdb = mongo_client.kafkadb
except:
    print('mongodb数据库连接不上')

@csrf_exempt
def mother_top(request):
    if request.is_ajax():
        if request.method == 'POST':
            status = request.POST.get('status')
            if status == "2":
                shop_top_name, shop_top_data = after_province()
                return HttpResponse(json.dumps({'shop_top_name': shop_top_name, 'shop_top_data': shop_top_data}))

@csrf_exempt
# 把这些字符截取掉：'省', '市', '自治区', '回族自治区', '壮族自治区', '维吾尔自治区'
def Charchange(city_name):
    c = ['省', '市', '回族自治区', '壮族自治区', '维吾尔自治区', '自治区']
    new_name = []
    name = city_name.split("|")[2]
    name2 = city_name.split("|")[3]
    for i in range(len(c)):
        c1 = len(c[i])
        name3 = name.strip(c[i])
        if name3 != name and len(name3) + c1 == len(name):
            new_name.append(name3)
            new_name.append(name2)
            break
    return (new_name)

@csrf_exempt
def http_obtain():
    balanced_consumer.start()
    n = 0
    http_request = []
    request_data_list = []
    for message in balanced_consumer:
        if message is not None and n < 1000:
            a = message.value.decode('utf-8')
            b = json.loads(a)
            if b["url"].split('/')[-1] != "getCode.json":
                request_data_list.append(b)
                http_request.append(b['origin'])
                n += 1
        else:
            print(message.value.decode('utf-8'))
            a = message.value.decode('utf-8')
            b = json.loads(a)
            if b["url"].split('/')[-1] != "getCode.json":
                request_data_list.append(b)
                http_request.append(b['origin'])
            break
    balanced_consumer.stop()
    kafka_list.append(http_request)
    insert_mongo_all(newdb,request_data_list)
    return kafka_list

@csrf_exempt
def obtain_kafka(kafka_list):
    while True:
        a = len(kafka_list)
        if a > 0:
            print("====%s====" % a)
            kafka_data = kafka_list.pop(0)
            return kafka_data

city_map = Ip2Region()

@csrf_exempt
def city_obtain():
    requests = obtain_kafka(kafka_list)
    global add_c_data
    global add_p_data
    add_p_data = {}
    add_c_data = {}
    for i in requests:
        b = city_map.btreeSearch(i)
        c = Charchange(b['region'].decode('utf-8'))
        try:
            if c[0] == '0' or c[1] == '0':
                pass
                #print(b['region'].decode('utf-8'), c, '======================')
            else:
                try:
                    add_p_data[c[0]] += 1
                except:
                    add_p_data[c[0]] = 1
                try:
                    add_c_data[c[1]] += 1
                except:
                    add_c_data[c[1]] = 1
                    # city_dist[c[0]] += 1
                    # add_province(c[0], city_dist[c[0]])
                    # add_city(c[1], 1)
        except:
            print(b['region'].decode('utf-8'), c, '======================')
    print(add_p_data)
    print(add_c_data)
    return city_dist

#周期任务1：获取kafka数据
schedules = sched.scheduler(time.time, time.sleep)
@csrf_exempt
def start_thread():
    if len(kafka_list) < 3:
        t = threading.Thread(target=http_obtain)
        t.start()
        t.join()
        print('=========================threading1111111============================')
    else:
        print('============================积累数据过多，线程等待===============================')

@csrf_exempt
def perform(inc):
    schedules.enter(inc, 0, perform, (inc,))
    # print('=====================================================1')
    start_thread()

@csrf_exempt
def mymain(inc=3):
    if schedules.empty():
        schedules.enter(0, 0, perform, (6,))
        # print('=====================================================2')
        schedules.run()
    else:
        print("=========周期任务已经存在==========")
    return HttpResponse()

#周期任务2,：插入数据库
schedules1 = sched.scheduler(time.time, time.sleep)
@csrf_exempt
def start_mysql():
    t = threading.Thread(target=city_obtain)
    t.start()
    t.join()
    t1 = threading.Thread(target=add_province, args=(add_p_data,))
    t2 = threading.Thread(target=add_city, args=(add_c_data,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    time.sleep(10)
    print('=========================threading22222222============================')

@csrf_exempt
def writemysql(inc):
    schedules1.enter(inc, 0, writemysql, (inc,))
    start_mysql()

@csrf_exempt
def writed(inc=3):
    if schedules1.empty():
        schedules1.enter(0, 0, writemysql, (6,))
        schedules1.run()
    else:
        print("=========周期任务已经存在==========")
    return HttpResponse()

def top_cycle(city_dist):
    top_map = {}
    top_map_list = []
    top = sorted(city_dist.items(), key=lambda x: x[1], reverse=True)
    for i in top:
        k = i[0]
        v = i[1]
        top_map_list.append(k)
        top_map[k] = v
        if len(top_map) == 5:
            break
    return top_map, top_map_list

@csrf_exempt
def china_map(request):
    # city_dist2 = city_obtain()
    city_dist = sel_province(city_list)
    if request.is_ajax():
        if request.method == 'POST':
            status = request.POST.get('status')
            if status == "0":
                # city_dist3 = city_obtain()
                # max_city = max(v for v in city_dist.values())
                # new_dates = datetime.now().strftime('%H')
                # if new_dates == "00":
                #    print('====================%s==================' % datetime.now())
                #    city_dist = sel_province(city_list)
                city_dist = sel_province(city_list)
                max_city = max(v for v in city_dist.values()) / 4
                return HttpResponse(json.dumps({
                    "map": city_dist,
                    "max_city": max_city
                }))
            elif status == "1":
                map_name = request.POST.get('mapname')
                map_city = request.POST.get('mapcity')
                file = '/usr/local/project/web02/china_map/templates/map'
                map_file = os.path.join(file, map_name)
                map_read = open(map_file, 'rb')
                try:
                    map_json = json.loads(map_read.read())
                finally:
                    map_read.close()
                other_city_dist3 = sel_city(map_city)
                other_max_city = max(v for v in other_city_dist3.values())
                print(2, other_city_dist3)
                return HttpResponse(json.dumps({
                    "othermap": map_json,
                    "othercity": other_city_dist3,
                    "othermax": other_max_city
                }))
    max_city = max(v for v in city_dist.values()) / 4
    shop_top_name, shop_top_data = after_province()
    return render(request, 'map.html',
                  {'map': json.dumps(city_dist), 'max_city': json.dumps(max_city),'shop_top_name': json.dumps(shop_top_name), 'shop_top_data': json.dumps(shop_top_data)})
